home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_weakref.py < prev    next >
Text File  |  2005-10-18  |  37KB  |  1,101 lines

  1. import gc
  2. import sys
  3. import unittest
  4. import UserList
  5. import weakref
  6.  
  7. from test import test_support
  8.  
  9.  
  10. class C:
  11.     def method(self):
  12.         pass
  13.  
  14.  
  15. class Callable:
  16.     bar = None
  17.  
  18.     def __call__(self, x):
  19.         self.bar = x
  20.  
  21.  
  22. def create_function():
  23.     def f(): pass
  24.     return f
  25.  
  26. def create_bound_method():
  27.     return C().method
  28.  
  29. def create_unbound_method():
  30.     return C.method
  31.  
  32.  
  33. class TestBase(unittest.TestCase):
  34.  
  35.     def setUp(self):
  36.         self.cbcalled = 0
  37.  
  38.     def callback(self, ref):
  39.         self.cbcalled += 1
  40.  
  41.  
  42. class ReferencesTestCase(TestBase):
  43.  
  44.     def test_basic_ref(self):
  45.         self.check_basic_ref(C)
  46.         self.check_basic_ref(create_function)
  47.         self.check_basic_ref(create_bound_method)
  48.         self.check_basic_ref(create_unbound_method)
  49.  
  50.         # Just make sure the tp_repr handler doesn't raise an exception.
  51.         # Live reference:
  52.         o = C()
  53.         wr = weakref.ref(o)
  54.         `wr`
  55.         # Dead reference:
  56.         del o
  57.         `wr`
  58.  
  59.     def test_basic_callback(self):
  60.         self.check_basic_callback(C)
  61.         self.check_basic_callback(create_function)
  62.         self.check_basic_callback(create_bound_method)
  63.         self.check_basic_callback(create_unbound_method)
  64.  
  65.     def test_multiple_callbacks(self):
  66.         o = C()
  67.         ref1 = weakref.ref(o, self.callback)
  68.         ref2 = weakref.ref(o, self.callback)
  69.         del o
  70.         self.assert_(ref1() is None,
  71.                      "expected reference to be invalidated")
  72.         self.assert_(ref2() is None,
  73.                      "expected reference to be invalidated")
  74.         self.assert_(self.cbcalled == 2,
  75.                      "callback not called the right number of times")
  76.  
  77.     def test_multiple_selfref_callbacks(self):
  78.         # Make sure all references are invalidated before callbacks are called
  79.         #
  80.         # What's important here is that we're using the first
  81.         # reference in the callback invoked on the second reference
  82.         # (the most recently created ref is cleaned up first).  This
  83.         # tests that all references to the object are invalidated
  84.         # before any of the callbacks are invoked, so that we only
  85.         # have one invocation of _weakref.c:cleanup_helper() active
  86.         # for a particular object at a time.
  87.         #
  88.         def callback(object, self=self):
  89.             self.ref()
  90.         c = C()
  91.         self.ref = weakref.ref(c, callback)
  92.         ref1 = weakref.ref(c, callback)
  93.         del c
  94.  
  95.     def test_proxy_ref(self):
  96.         o = C()
  97.         o.bar = 1
  98.         ref1 = weakref.proxy(o, self.callback)
  99.         ref2 = weakref.proxy(o, self.callback)
  100.         del o
  101.  
  102.         def check(proxy):
  103.             proxy.bar
  104.  
  105.         self.assertRaises(weakref.ReferenceError, check, ref1)
  106.         self.assertRaises(weakref.ReferenceError, check, ref2)
  107.         self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C()))
  108.         self.assert_(self.cbcalled == 2)
  109.  
  110.     def check_basic_ref(self, factory):
  111.         o = factory()
  112.         ref = weakref.ref(o)
  113.         self.assert_(ref() is not None,
  114.                      "weak reference to live object should be live")
  115.         o2 = ref()
  116.         self.assert_(o is o2,
  117.                      "<ref>() should return original object if live")
  118.  
  119.     def check_basic_callback(self, factory):
  120.         self.cbcalled = 0
  121.         o = factory()
  122.         ref = weakref.ref(o, self.callback)
  123.         del o
  124.         self.assert_(self.cbcalled == 1,
  125.                      "callback did not properly set 'cbcalled'")
  126.         self.assert_(ref() is None,
  127.                      "ref2 should be dead after deleting object reference")
  128.  
  129.     def test_ref_reuse(self):
  130.         o = C()
  131.         ref1 = weakref.ref(o)
  132.         # create a proxy to make sure that there's an intervening creation
  133.         # between these two; it should make no difference
  134.         proxy = weakref.proxy(o)
  135.         ref2 = weakref.ref(o)
  136.         self.assert_(ref1 is ref2,
  137.                      "reference object w/out callback should be re-used")
  138.  
  139.         o = C()
  140.         proxy = weakref.proxy(o)
  141.         ref1 = weakref.ref(o)
  142.         ref2 = weakref.ref(o)
  143.         self.assert_(ref1 is ref2,
  144.                      "reference object w/out callback should be re-used")
  145.         self.assert_(weakref.getweakrefcount(o) == 2,
  146.                      "wrong weak ref count for object")
  147.         del proxy
  148.         self.assert_(weakref.getweakrefcount(o) == 1,
  149.                      "wrong weak ref count for object after deleting proxy")
  150.  
  151.     def test_proxy_reuse(self):
  152.         o = C()
  153.         proxy1 = weakref.proxy(o)
  154.         ref = weakref.ref(o)
  155.         proxy2 = weakref.proxy(o)
  156.         self.assert_(proxy1 is proxy2,
  157.                      "proxy object w/out callback should have been re-used")
  158.  
  159.     def test_basic_proxy(self):
  160.         o = C()
  161.         self.check_proxy(o, weakref.proxy(o))
  162.  
  163.         L = UserList.UserList()
  164.         p = weakref.proxy(L)
  165.         self.failIf(p, "proxy for empty UserList should be false")
  166.         p.append(12)
  167.         self.assertEqual(len(L), 1)
  168.         self.failUnless(p, "proxy for non-empty UserList should be true")
  169.         p[:] = [2, 3]
  170.         self.assertEqual(len(L), 2)
  171.         self.assertEqual(len(p), 2)
  172.         self.failUnless(3 in p,
  173.                         "proxy didn't support __contains__() properly")
  174.         p[1] = 5
  175.         self.assertEqual(L[1], 5)
  176.         self.assertEqual(p[1], 5)
  177.         L2 = UserList.UserList(L)
  178.         p2 = weakref.proxy(L2)
  179.         self.assertEqual(p, p2)
  180.         ## self.assertEqual(repr(L2), repr(p2))
  181.         L3 = UserList.UserList(range(10))
  182.         p3 = weakref.proxy(L3)
  183.         self.assertEqual(L3[:], p3[:])
  184.         self.assertEqual(L3[5:], p3[5:])
  185.         self.assertEqual(L3[:5], p3[:5])
  186.         self.assertEqual(L3[2:5], p3[2:5])
  187.  
  188.     # The PyWeakref_* C API is documented as allowing either NULL or
  189.     # None as the value for the callback, where either means "no
  190.     # callback".  The "no callback" ref and proxy objects are supposed
  191.     # to be shared so long as they exist by all callers so long as
  192.     # they are active.  In Python 2.3.3 and earlier, this guaranttee
  193.     # was not honored, and was broken in different ways for
  194.     # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
  195.  
  196.     def test_shared_ref_without_callback(self):
  197.         self.check_shared_without_callback(weakref.ref)
  198.  
  199.     def test_shared_proxy_without_callback(self):
  200.         self.check_shared_without_callback(weakref.proxy)
  201.  
  202.     def check_shared_without_callback(self, makeref):
  203.         o = Object(1)
  204.         p1 = makeref(o, None)
  205.         p2 = makeref(o, None)
  206.         self.assert_(p1 is p2, "both callbacks were None in the C API")
  207.         del p1, p2
  208.         p1 = makeref(o)
  209.         p2 = makeref(o, None)
  210.         self.assert_(p1 is p2, "callbacks were NULL, None in the C API")
  211.         del p1, p2
  212.         p1 = makeref(o)
  213.         p2 = makeref(o)
  214.         self.assert_(p1 is p2, "both callbacks were NULL in the C API")
  215.         del p1, p2
  216.         p1 = makeref(o, None)
  217.         p2 = makeref(o)
  218.         self.assert_(p1 is p2, "callbacks were None, NULL in the C API")
  219.  
  220.     def test_callable_proxy(self):
  221.         o = Callable()
  222.         ref1 = weakref.proxy(o)
  223.  
  224.         self.check_proxy(o, ref1)
  225.  
  226.         self.assert_(type(ref1) is weakref.CallableProxyType,
  227.                      "proxy is not of callable type")
  228.         ref1('twinkies!')
  229.         self.assert_(o.bar == 'twinkies!',
  230.                      "call through proxy not passed through to original")
  231.         ref1(x='Splat.')
  232.         self.assert_(o.bar == 'Splat.',
  233.                      "call through proxy not passed through to original")
  234.  
  235.         # expect due to too few args
  236.         self.assertRaises(TypeError, ref1)
  237.  
  238.         # expect due to too many args
  239.         self.assertRaises(TypeError, ref1, 1, 2, 3)
  240.  
  241.     def check_proxy(self, o, proxy):
  242.         o.foo = 1
  243.         self.assert_(proxy.foo == 1,
  244.                      "proxy does not reflect attribute addition")
  245.         o.foo = 2
  246.         self.assert_(proxy.foo == 2,
  247.                      "proxy does not reflect attribute modification")
  248.         del o.foo
  249.         self.assert_(not hasattr(proxy, 'foo'),
  250.                      "proxy does not reflect attribute removal")
  251.  
  252.         proxy.foo = 1
  253.         self.assert_(o.foo == 1,
  254.                      "object does not reflect attribute addition via proxy")
  255.         proxy.foo = 2
  256.         self.assert_(
  257.             o.foo == 2,
  258.             "object does not reflect attribute modification via proxy")
  259.         del proxy.foo
  260.         self.assert_(not hasattr(o, 'foo'),
  261.                      "object does not reflect attribute removal via proxy")
  262.  
  263.     def test_proxy_deletion(self):
  264.         # Test clearing of SF bug #762891
  265.         class Foo:
  266.             result = None
  267.             def __delitem__(self, accessor):
  268.                 self.result = accessor
  269.         g = Foo()
  270.         f = weakref.proxy(g)
  271.         del f[0]
  272.         self.assertEqual(f.result, 0)
  273.  
  274.     def test_proxy_bool(self):
  275.         # Test clearing of SF bug #1170766
  276.         class List(list): pass
  277.         lyst = List()
  278.         self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
  279.  
  280.     def test_getweakrefcount(self):
  281.         o = C()
  282.         ref1 = weakref.ref(o)
  283.         ref2 = weakref.ref(o, self.callback)
  284.         self.assert_(weakref.getweakrefcount(o) == 2,
  285.                      "got wrong number of weak reference objects")
  286.  
  287.         proxy1 = weakref.proxy(o)
  288.         proxy2 = weakref.proxy(o, self.callback)
  289.         self.assert_(weakref.getweakrefcount(o) == 4,
  290.                      "got wrong number of weak reference objects")
  291.  
  292.         del ref1, ref2, proxy1, proxy2
  293.         self.assert_(weakref.getweakrefcount(o) == 0,
  294.                      "weak reference objects not unlinked from"
  295.                      " referent when discarded.")
  296.  
  297.         # assumes ints do not support weakrefs
  298.         self.assert_(weakref.getweakrefcount(1) == 0,
  299.                      "got wrong number of weak reference objects for int")
  300.  
  301.     def test_getweakrefs(self):
  302.         o = C()
  303.         ref1 = weakref.ref(o, self.callback)
  304.         ref2 = weakref.ref(o, self.callback)
  305.         del ref1
  306.         self.assert_(weakref.getweakrefs(o) == [ref2],
  307.                      "list of refs does not match")
  308.  
  309.         o = C()
  310.         ref1 = weakref.ref(o, self.callback)
  311.         ref2 = weakref.ref(o, self.callback)
  312.         del ref2
  313.         self.assert_(weakref.getweakrefs(o) == [ref1],
  314.                      "list of refs does not match")
  315.  
  316.         del ref1
  317.         self.assert_(weakref.getweakrefs(o) == [],
  318.                      "list of refs not cleared")
  319.  
  320.         # assumes ints do not support weakrefs
  321.         self.assert_(weakref.getweakrefs(1) == [],
  322.                      "list of refs does not match for int")
  323.  
  324.     def test_newstyle_number_ops(self):
  325.         class F(float):
  326.             pass
  327.         f = F(2.0)
  328.         p = weakref.proxy(f)
  329.         self.assert_(p + 1.0 == 3.0)
  330.         self.assert_(1.0 + p == 3.0)  # this used to SEGV
  331.  
  332.     def test_callbacks_protected(self):
  333.         # Callbacks protected from already-set exceptions?
  334.         # Regression test for SF bug #478534.
  335.         class BogusError(Exception):
  336.             pass
  337.         data = {}
  338.         def remove(k):
  339.             del data[k]
  340.         def encapsulate():
  341.             f = lambda : ()
  342.             data[weakref.ref(f, remove)] = None
  343.             raise BogusError
  344.         try:
  345.             encapsulate()
  346.         except BogusError:
  347.             pass
  348.         else:
  349.             self.fail("exception not properly restored")
  350.         try:
  351.             encapsulate()
  352.         except BogusError:
  353.             pass
  354.         else:
  355.             self.fail("exception not properly restored")
  356.  
  357.     def test_sf_bug_840829(self):
  358.         # "weakref callbacks and gc corrupt memory"
  359.         # subtype_dealloc erroneously exposed a new-style instance
  360.         # already in the process of getting deallocated to gc,
  361.         # causing double-deallocation if the instance had a weakref
  362.         # callback that triggered gc.
  363.         # If the bug exists, there probably won't be an obvious symptom
  364.         # in a release build.  In a debug build, a segfault will occur
  365.         # when the second attempt to remove the instance from the "list
  366.         # of all objects" occurs.
  367.  
  368.         import gc
  369.  
  370.         class C(object):
  371.             pass
  372.  
  373.         c = C()
  374.         wr = weakref.ref(c, lambda ignore: gc.collect())
  375.         del c
  376.  
  377.         # There endeth the first part.  It gets worse.
  378.         del wr
  379.  
  380.         c1 = C()
  381.         c1.i = C()
  382.         wr = weakref.ref(c1.i, lambda ignore: gc.collect())
  383.  
  384.         c2 = C()
  385.         c2.c1 = c1
  386.         del c1  # still alive because c2 points to it
  387.  
  388.         # Now when subtype_dealloc gets called on c2, it's not enough just
  389.         # that c2 is immune from gc while the weakref callbacks associated
  390.         # with c2 execute (there are none in this 2nd half of the test, btw).
  391.         # subtype_dealloc goes on to call the base classes' deallocs too,
  392.         # so any gc triggered by weakref callbacks associated with anything
  393.         # torn down by a base class dealloc can also trigger double
  394.         # deallocation of c2.
  395.         del c2
  396.  
  397.     def test_callback_in_cycle_1(self):
  398.         import gc
  399.  
  400.         class J(object):
  401.             pass
  402.  
  403.         class II(object):
  404.             def acallback(self, ignore):
  405.                 self.J
  406.  
  407.         I = II()
  408.         I.J = J
  409.         I.wr = weakref.ref(J, I.acallback)
  410.  
  411.         # Now J and II are each in a self-cycle (as all new-style class
  412.         # objects are, since their __mro__ points back to them).  I holds
  413.         # both a weak reference (I.wr) and a strong reference (I.J) to class
  414.         # J.  I is also in a cycle (I.wr points to a weakref that references
  415.         # I.acallback).  When we del these three, they all become trash, but
  416.         # the cycles prevent any of them from getting cleaned up immediately.
  417.         # Instead they have to wait for cyclic gc to deduce that they're
  418.         # trash.
  419.         #
  420.         # gc used to call tp_clear on all of them, and the order in which
  421.         # it does that is pretty accidental.  The exact order in which we
  422.         # built up these things manages to provoke gc into running tp_clear
  423.         # in just the right order (I last).  Calling tp_clear on II leaves
  424.         # behind an insane class object (its __mro__ becomes NULL).  Calling
  425.         # tp_clear on J breaks its self-cycle, but J doesn't get deleted
  426.         # just then because of the strong reference from I.J.  Calling
  427.         # tp_clear on I starts to clear I's __dict__, and just happens to
  428.         # clear I.J first -- I.wr is still intact.  That removes the last
  429.         # reference to J, which triggers the weakref callback.  The callback
  430.         # tries to do "self.J", and instances of new-style classes look up
  431.         # attributes ("J") in the class dict first.  The class (II) wants to
  432.         # search II.__mro__, but that's NULL.   The result was a segfault in
  433.         # a release build, and an assert failure in a debug build.
  434.         del I, J, II
  435.         gc.collect()
  436.  
  437.     def test_callback_in_cycle_2(self):
  438.         import gc
  439.  
  440.         # This is just like test_callback_in_cycle_1, except that II is an
  441.         # old-style class.  The symptom is different then:  an instance of an
  442.         # old-style class looks in its own __dict__ first.  'J' happens to
  443.         # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
  444.         # __dict__, so the attribute isn't found.  The difference is that
  445.         # the old-style II doesn't have a NULL __mro__ (it doesn't have any
  446.         # __mro__), so no segfault occurs.  Instead it got:
  447.         #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
  448.         #    Exception exceptions.AttributeError:
  449.         #   "II instance has no attribute 'J'" in <bound method II.acallback
  450.         #       of <?.II instance at 0x00B9B4B8>> ignored
  451.  
  452.         class J(object):
  453.             pass
  454.  
  455.         class II:
  456.             def acallback(self, ignore):
  457.                 self.J
  458.  
  459.         I = II()
  460.         I.J = J
  461.         I.wr = weakref.ref(J, I.acallback)
  462.  
  463.         del I, J, II
  464.         gc.collect()
  465.  
  466.     def test_callback_in_cycle_3(self):
  467.         import gc
  468.  
  469.         # This one broke the first patch that fixed the last two.  In this
  470.         # case, the objects reachable from the callback aren't also reachable
  471.         # from the object (c1) *triggering* the callback:  you can get to
  472.         # c1 from c2, but not vice-versa.  The result was that c2's __dict__
  473.         # got tp_clear'ed by the time the c2.cb callback got invoked.
  474.  
  475.         class C:
  476.             def cb(self, ignore):
  477.                 self.me
  478.                 self.c1
  479.                 self.wr
  480.  
  481.         c1, c2 = C(), C()
  482.  
  483.         c2.me = c2
  484.         c2.c1 = c1
  485.         c2.wr = weakref.ref(c1, c2.cb)
  486.  
  487.         del c1, c2
  488.         gc.collect()
  489.  
  490.     def test_callback_in_cycle_4(self):
  491.         import gc
  492.  
  493.         # Like test_callback_in_cycle_3, except c2 and c1 have different
  494.         # classes.  c2's class (C) isn't reachable from c1 then, so protecting
  495.         # objects reachable from the dying object (c1) isn't enough to stop
  496.         # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
  497.         # The result was a segfault (C.__mro__ was NULL when the callback
  498.         # tried to look up self.me).
  499.  
  500.         class C(object):
  501.             def cb(self, ignore):
  502.                 self.me
  503.                 self.c1
  504.                 self.wr
  505.  
  506.         class D:
  507.             pass
  508.  
  509.         c1, c2 = D(), C()
  510.  
  511.         c2.me = c2
  512.         c2.c1 = c1
  513.         c2.wr = weakref.ref(c1, c2.cb)
  514.  
  515.         del c1, c2, C, D
  516.         gc.collect()
  517.  
  518.     def test_callback_in_cycle_resurrection(self):
  519.         import gc
  520.  
  521.         # Do something nasty in a weakref callback:  resurrect objects
  522.         # from dead cycles.  For this to be attempted, the weakref and
  523.         # its callback must also be part of the cyclic trash (else the
  524.         # objects reachable via the callback couldn't be in cyclic trash
  525.         # to begin with -- the callback would act like an external root).
  526.         # But gc clears trash weakrefs with callbacks early now, which
  527.         # disables the callbacks, so the callbacks shouldn't get called
  528.         # at all (and so nothing actually gets resurrected).
  529.  
  530.         alist = []
  531.         class C(object):
  532.             def __init__(self, value):
  533.                 self.attribute = value
  534.  
  535.             def acallback(self, ignore):
  536.                 alist.append(self.c)
  537.  
  538.         c1, c2 = C(1), C(2)
  539.         c1.c = c2
  540.         c2.c = c1
  541.         c1.wr = weakref.ref(c2, c1.acallback)
  542.         c2.wr = weakref.ref(c1, c2.acallback)
  543.  
  544.         def C_went_away(ignore):
  545.             alist.append("C went away")
  546.         wr = weakref.ref(C, C_went_away)
  547.  
  548.         del c1, c2, C   # make them all trash
  549.         self.assertEqual(alist, [])  # del isn't enough to reclaim anything
  550.  
  551.         gc.collect()
  552.         # c1.wr and c2.wr were part of the cyclic trash, so should have
  553.         # been cleared without their callbacks executing.  OTOH, the weakref
  554.         # to C is bound to a function local (wr), and wasn't trash, so that
  555.         # callback should have been invoked when C went away.
  556.         self.assertEqual(alist, ["C went away"])
  557.         # The remaining weakref should be dead now (its callback ran).
  558.         self.assertEqual(wr(), None)
  559.  
  560.         del alist[:]
  561.         gc.collect()
  562.         self.assertEqual(alist, [])
  563.  
  564.     def test_callbacks_on_callback(self):
  565.         import gc
  566.  
  567.         # Set up weakref callbacks *on* weakref callbacks.
  568.         alist = []
  569.         def safe_callback(ignore):
  570.             alist.append("safe_callback called")
  571.  
  572.         class C(object):
  573.             def cb(self, ignore):
  574.                 alist.append("cb called")
  575.  
  576.         c, d = C(), C()
  577.         c.other = d
  578.         d.other = c
  579.         callback = c.cb
  580.         c.wr = weakref.ref(d, callback)     # this won't trigger
  581.         d.wr = weakref.ref(callback, d.cb)  # ditto
  582.         external_wr = weakref.ref(callback, safe_callback)  # but this will
  583.         self.assert_(external_wr() is callback)
  584.  
  585.         # The weakrefs attached to c and d should get cleared, so that
  586.         # C.cb is never called.  But external_wr isn't part of the cyclic
  587.         # trash, and no cyclic trash is reachable from it, so safe_callback
  588.         # should get invoked when the bound method object callback (c.cb)
  589.         # -- which is itself a callback, and also part of the cyclic trash --
  590.         # gets reclaimed at the end of gc.
  591.  
  592.         del callback, c, d, C
  593.         self.assertEqual(alist, [])  # del isn't enough to clean up cycles
  594.         gc.collect()
  595.         self.assertEqual(alist, ["safe_callback called"])
  596.         self.assertEqual(external_wr(), None)
  597.  
  598.         del alist[:]
  599.         gc.collect()
  600.         self.assertEqual(alist, [])
  601.  
  602.     def test_gc_during_ref_creation(self):
  603.         self.check_gc_during_creation(weakref.ref)
  604.  
  605.     def test_gc_during_proxy_creation(self):
  606.         self.check_gc_during_creation(weakref.proxy)
  607.  
  608.     def check_gc_during_creation(self, makeref):
  609.         thresholds = gc.get_threshold()
  610.         gc.set_threshold(1, 1, 1)
  611.         gc.collect()
  612.         class A:
  613.             pass
  614.  
  615.         def callback(*args):
  616.             pass
  617.  
  618.         referenced = A()
  619.  
  620.         a = A()
  621.         a.a = a
  622.         a.wr = makeref(referenced)
  623.  
  624.         try:
  625.             # now make sure the object and the ref get labeled as
  626.             # cyclic trash:
  627.             a = A()
  628.             weakref.ref(referenced, callback)
  629.  
  630.         finally:
  631.             gc.set_threshold(*thresholds)
  632.  
  633.  
  634. class SubclassableWeakrefTestCase(unittest.TestCase):
  635.  
  636.     def test_subclass_refs(self):
  637.         class MyRef(weakref.ref):
  638.             def __init__(self, ob, callback=None, value=42):
  639.                 self.value = value
  640.                 super(MyRef, self).__init__(ob, callback)
  641.             def __call__(self):
  642.                 self.called = True
  643.                 return super(MyRef, self).__call__()
  644.         o = Object("foo")
  645.         mr = MyRef(o, value=24)
  646.         self.assert_(mr() is o)
  647.         self.assert_(mr.called)
  648.         self.assertEqual(mr.value, 24)
  649.         del o
  650.         self.assert_(mr() is None)
  651.         self.assert_(mr.called)
  652.  
  653.     def test_subclass_refs_dont_replace_standard_refs(self):
  654.         class MyRef(weakref.ref):
  655.             pass
  656.         o = Object(42)
  657.         r1 = MyRef(o)
  658.         r2 = weakref.ref(o)
  659.         self.assert_(r1 is not r2)
  660.         self.assertEqual(weakref.getweakrefs(o), [r2, r1])
  661.         self.assertEqual(weakref.getweakrefcount(o), 2)
  662.         r3 = MyRef(o)
  663.         self.assertEqual(weakref.getweakrefcount(o), 3)
  664.         refs = weakref.getweakrefs(o)
  665.         self.assertEqual(len(refs), 3)
  666.         self.assert_(r2 is refs[0])
  667.         self.assert_(r1 in refs[1:])
  668.         self.assert_(r3 in refs[1:])
  669.  
  670.     def test_subclass_refs_dont_conflate_callbacks(self):
  671.         class MyRef(weakref.ref):
  672.             pass
  673.         o = Object(42)
  674.         r1 = MyRef(o, id)
  675.         r2 = MyRef(o, str)
  676.         self.assert_(r1 is not r2)
  677.         refs = weakref.getweakrefs(o)
  678.         self.assert_(r1 in refs)
  679.         self.assert_(r2 in refs)
  680.  
  681.     def test_subclass_refs_with_slots(self):
  682.         class MyRef(weakref.ref):
  683.             __slots__ = "slot1", "slot2"
  684.             def __new__(type, ob, callback, slot1, slot2):
  685.                 return weakref.ref.__new__(type, ob, callback)
  686.             def __init__(self, ob, callback, slot1, slot2):
  687.                 self.slot1 = slot1
  688.                 self.slot2 = slot2
  689.             def meth(self):
  690.                 return self.slot1 + self.slot2
  691.         o = Object(42)
  692.         r = MyRef(o, None, "abc", "def")
  693.         self.assertEqual(r.slot1, "abc")
  694.         self.assertEqual(r.slot2, "def")
  695.         self.assertEqual(r.meth(), "abcdef")
  696.         self.failIf(hasattr(r, "__dict__"))
  697.  
  698.  
  699. class Object:
  700.     def __init__(self, arg):
  701.         self.arg = arg
  702.     def __repr__(self):
  703.         return "<Object %r>" % self.arg
  704.  
  705.  
  706. class MappingTestCase(TestBase):
  707.  
  708.     COUNT = 10
  709.  
  710.     def test_weak_values(self):
  711.         #
  712.         #  This exercises d.copy(), d.items(), d[], del d[], len(d).
  713.         #
  714.         dict, objects = self.make_weak_valued_dict()
  715.         for o in objects:
  716.             self.assert_(weakref.getweakrefcount(o) == 1,
  717.                          "wrong number of weak references to %r!" % o)
  718.             self.assert_(o is dict[o.arg],
  719.                          "wrong object returned by weak dict!")
  720.         items1 = dict.items()
  721.         items2 = dict.copy().items()
  722.         items1.sort()
  723.         items2.sort()
  724.         self.assert_(items1 == items2,
  725.                      "cloning of weak-valued dictionary did not work!")
  726.         del items1, items2
  727.         self.assert_(len(dict) == self.COUNT)
  728.         del objects[0]
  729.         self.assert_(len(dict) == (self.COUNT - 1),
  730.                      "deleting object did not cause dictionary update")
  731.         del objects, o
  732.         self.assert_(len(dict) == 0,
  733.                      "deleting the values did not clear the dictionary")
  734.         # regression on SF bug #447152:
  735.         dict = weakref.WeakValueDictionary()
  736.         self.assertRaises(KeyError, dict.__getitem__, 1)
  737.         dict[2] = C()
  738.         self.assertRaises(KeyError, dict.__getitem__, 2)
  739.  
  740.     def test_weak_keys(self):
  741.         #
  742.         #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
  743.         #  len(d), d.has_key().
  744.         #
  745.         dict, objects = self.make_weak_keyed_dict()
  746.         for o in objects:
  747.             self.assert_(weakref.getweakrefcount(o) == 1,
  748.                          "wrong number of weak references to %r!" % o)
  749.             self.assert_(o.arg is dict[o],
  750.                          "wrong object returned by weak dict!")
  751.         items1 = dict.items()
  752.         items2 = dict.copy().items()
  753.         self.assert_(set(items1) == set(items2),
  754.                      "cloning of weak-keyed dictionary did not work!")
  755.         del items1, items2
  756.         self.assert_(len(dict) == self.COUNT)
  757.         del objects[0]
  758.         self.assert_(len(dict) == (self.COUNT - 1),
  759.                      "deleting object did not cause dictionary update")
  760.         del objects, o
  761.         self.assert_(len(dict) == 0,
  762.                      "deleting the keys did not clear the dictionary")
  763.         o = Object(42)
  764.         dict[o] = "What is the meaning of the universe?"
  765.         self.assert_(dict.has_key(o))
  766.         self.assert_(not dict.has_key(34))
  767.  
  768.     def test_weak_keyed_iters(self):
  769.         dict, objects = self.make_weak_keyed_dict()
  770.         self.check_iters(dict)
  771.  
  772.     def test_weak_valued_iters(self):
  773.         dict, objects = self.make_weak_valued_dict()
  774.         self.check_iters(dict)
  775.  
  776.     def check_iters(self, dict):
  777.         # item iterator:
  778.         items = dict.items()
  779.         for item in dict.iteritems():
  780.             items.remove(item)
  781.         self.assert_(len(items) == 0, "iteritems() did not touch all items")
  782.  
  783.         # key iterator, via __iter__():
  784.         keys = dict.keys()
  785.         for k in dict:
  786.             keys.remove(k)
  787.         self.assert_(len(keys) == 0, "__iter__() did not touch all keys")
  788.  
  789.         # key iterator, via iterkeys():
  790.         keys = dict.keys()
  791.         for k in dict.iterkeys():
  792.             keys.remove(k)
  793.         self.assert_(len(keys) == 0, "iterkeys() did not touch all keys")
  794.  
  795.         # value iterator:
  796.         values = dict.values()
  797.         for v in dict.itervalues():
  798.             values.remove(v)
  799.         self.assert_(len(values) == 0,
  800.                      "itervalues() did not touch all values")
  801.  
  802.     def test_make_weak_keyed_dict_from_dict(self):
  803.         o = Object(3)
  804.         dict = weakref.WeakKeyDictionary({o:364})
  805.         self.assert_(dict[o] == 364)
  806.  
  807.     def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
  808.         o = Object(3)
  809.         dict = weakref.WeakKeyDictionary({o:364})
  810.         dict2 = weakref.WeakKeyDictionary(dict)
  811.         self.assert_(dict[o] == 364)
  812.  
  813.     def make_weak_keyed_dict(self):
  814.         dict = weakref.WeakKeyDictionary()
  815.         objects = map(Object, range(self.COUNT))
  816.         for o in objects:
  817.             dict[o] = o.arg
  818.         return dict, objects
  819.  
  820.     def make_weak_valued_dict(self):
  821.         dict = weakref.WeakValueDictionary()
  822.         objects = map(Object, range(self.COUNT))
  823.         for o in objects:
  824.             dict[o.arg] = o
  825.         return dict, objects
  826.  
  827.     def check_popitem(self, klass, key1, value1, key2, value2):
  828.         weakdict = klass()
  829.         weakdict[key1] = value1
  830.         weakdict[key2] = value2
  831.         self.assert_(len(weakdict) == 2)
  832.         k, v = weakdict.popitem()
  833.         self.assert_(len(weakdict) == 1)
  834.         if k is key1:
  835.             self.assert_(v is value1)
  836.         else:
  837.             self.assert_(v is value2)
  838.         k, v = weakdict.popitem()
  839.         self.assert_(len(weakdict) == 0)
  840.         if k is key1:
  841.             self.assert_(v is value1)
  842.         else:
  843.             self.assert_(v is value2)
  844.  
  845.     def test_weak_valued_dict_popitem(self):
  846.         self.check_popitem(weakref.WeakValueDictionary,
  847.                            "key1", C(), "key2", C())
  848.  
  849.     def test_weak_keyed_dict_popitem(self):
  850.         self.check_popitem(weakref.WeakKeyDictionary,
  851.                            C(), "value 1", C(), "value 2")
  852.  
  853.     def check_setdefault(self, klass, key, value1, value2):
  854.         self.assert_(value1 is not value2,
  855.                      "invalid test"
  856.                      " -- value parameters must be distinct objects")
  857.         weakdict = klass()
  858.         o = weakdict.setdefault(key, value1)
  859.         self.assert_(o is value1)
  860.         self.assert_(weakdict.has_key(key))
  861.         self.assert_(weakdict.get(key) is value1)
  862.         self.assert_(weakdict[key] is value1)
  863.  
  864.         o = weakdict.setdefault(key, value2)
  865.         self.assert_(o is value1)
  866.         self.assert_(weakdict.has_key(key))
  867.         self.assert_(weakdict.get(key) is value1)
  868.         self.assert_(weakdict[key] is value1)
  869.  
  870.     def test_weak_valued_dict_setdefault(self):
  871.         self.check_setdefault(weakref.WeakValueDictionary,
  872.                               "key", C(), C())
  873.  
  874.     def test_weak_keyed_dict_setdefault(self):
  875.         self.check_setdefault(weakref.WeakKeyDictionary,
  876.                               C(), "value 1", "value 2")
  877.  
  878.     def check_update(self, klass, dict):
  879.         #
  880.         #  This exercises d.update(), len(d), d.keys(), d.has_key(),
  881.         #  d.get(), d[].
  882.         #
  883.         weakdict = klass()
  884.         weakdict.update(dict)
  885.         self.assert_(len(weakdict) == len(dict))
  886.         for k in weakdict.keys():
  887.             self.assert_(dict.has_key(k),
  888.                          "mysterious new key appeared in weak dict")
  889.             v = dict.get(k)
  890.             self.assert_(v is weakdict[k])
  891.             self.assert_(v is weakdict.get(k))
  892.         for k in dict.keys():
  893.             self.assert_(weakdict.has_key(k),
  894.                          "original key disappeared in weak dict")
  895.             v = dict[k]
  896.             self.assert_(v is weakdict[k])
  897.             self.assert_(v is weakdict.get(k))
  898.  
  899.     def test_weak_valued_dict_update(self):
  900.         self.check_update(weakref.WeakValueDictionary,
  901.                           {1: C(), 'a': C(), C(): C()})
  902.  
  903.     def test_weak_keyed_dict_update(self):
  904.         self.check_update(weakref.WeakKeyDictionary,
  905.                           {C(): 1, C(): 2, C(): 3})
  906.  
  907.     def test_weak_keyed_delitem(self):
  908.         d = weakref.WeakKeyDictionary()
  909.         o1 = Object('1')
  910.         o2 = Object('2')
  911.         d[o1] = 'something'
  912.         d[o2] = 'something'
  913.         self.assert_(len(d) == 2)
  914.         del d[o1]
  915.         self.assert_(len(d) == 1)
  916.         self.assert_(d.keys() == [o2])
  917.  
  918.     def test_weak_valued_delitem(self):
  919.         d = weakref.WeakValueDictionary()
  920.         o1 = Object('1')
  921.         o2 = Object('2')
  922.         d['something'] = o1
  923.         d['something else'] = o2
  924.         self.assert_(len(d) == 2)
  925.         del d['something']
  926.         self.assert_(len(d) == 1)
  927.         self.assert_(d.items() == [('something else', o2)])
  928.  
  929.     def test_weak_keyed_bad_delitem(self):
  930.         d = weakref.WeakKeyDictionary()
  931.         o = Object('1')
  932.         # An attempt to delete an object that isn't there should raise
  933.         # KeyError.  It didn't before 2.3.
  934.         self.assertRaises(KeyError, d.__delitem__, o)
  935.         self.assertRaises(KeyError, d.__getitem__, o)
  936.  
  937.         # If a key isn't of a weakly referencable type, __getitem__ and
  938.         # __setitem__ raise TypeError.  __delitem__ should too.
  939.         self.assertRaises(TypeError, d.__delitem__,  13)
  940.         self.assertRaises(TypeError, d.__getitem__,  13)
  941.         self.assertRaises(TypeError, d.__setitem__,  13, 13)
  942.  
  943.     def test_weak_keyed_cascading_deletes(self):
  944.         # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
  945.         # over the keys via self.data.iterkeys().  If things vanished from
  946.         # the dict during this (or got added), that caused a RuntimeError.
  947.  
  948.         d = weakref.WeakKeyDictionary()
  949.         mutate = False
  950.  
  951.         class C(object):
  952.             def __init__(self, i):
  953.                 self.value = i
  954.             def __hash__(self):
  955.                 return hash(self.value)
  956.             def __eq__(self, other):
  957.                 if mutate:
  958.                     # Side effect that mutates the dict, by removing the
  959.                     # last strong reference to a key.
  960.                     del objs[-1]
  961.                 return self.value == other.value
  962.  
  963.         objs = [C(i) for i in range(4)]
  964.         for o in objs:
  965.             d[o] = o.value
  966.         del o   # now the only strong references to keys are in objs
  967.         # Find the order in which iterkeys sees the keys.
  968.         objs = d.keys()
  969.         # Reverse it, so that the iteration implementation of __delitem__
  970.         # has to keep looping to find the first object we delete.
  971.         objs.reverse()
  972.  
  973.         # Turn on mutation in C.__eq__.  The first time thru the loop,
  974.         # under the iterkeys() business the first comparison will delete
  975.         # the last item iterkeys() would see, and that causes a
  976.         #     RuntimeError: dictionary changed size during iteration
  977.         # when the iterkeys() loop goes around to try comparing the next
  978.         # key.  After this was fixed, it just deletes the last object *our*
  979.         # "for o in obj" loop would have gotten to.
  980.         mutate = True
  981.         count = 0
  982.         for o in objs:
  983.             count += 1
  984.             del d[o]
  985.         self.assertEqual(len(d), 0)
  986.         self.assertEqual(count, 2)
  987.  
  988. from test import mapping_tests
  989.  
  990. class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
  991.     """Check that WeakValueDictionary conforms to the mapping protocol"""
  992.     __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
  993.     type2test = weakref.WeakValueDictionary
  994.     def _reference(self):
  995.         return self.__ref.copy()
  996.  
  997. class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
  998.     """Check that WeakKeyDictionary conforms to the mapping protocol"""
  999.     __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
  1000.     type2test = weakref.WeakKeyDictionary
  1001.     def _reference(self):
  1002.         return self.__ref.copy()
  1003.  
  1004. libreftest = """ Doctest for examples in the library reference: libweakref.tex
  1005.  
  1006. >>> import weakref
  1007. >>> class Dict(dict):
  1008. ...     pass
  1009. ...
  1010. >>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
  1011. >>> r = weakref.ref(obj)
  1012. >>> print r()
  1013. {'blue': 3, 'green': 2, 'red': 1}
  1014.  
  1015. >>> import weakref
  1016. >>> class Object:
  1017. ...     pass
  1018. ...
  1019. >>> o = Object()
  1020. >>> r = weakref.ref(o)
  1021. >>> o2 = r()
  1022. >>> o is o2
  1023. True
  1024. >>> del o, o2
  1025. >>> print r()
  1026. None
  1027.  
  1028. >>> import weakref
  1029. >>> class ExtendedRef(weakref.ref):
  1030. ...     def __init__(self, ob, callback=None, **annotations):
  1031. ...         super(ExtendedRef, self).__init__(ob, callback)
  1032. ...         self.__counter = 0
  1033. ...         for k, v in annotations.iteritems():
  1034. ...             setattr(self, k, v)
  1035. ...     def __call__(self):
  1036. ...         '''Return a pair containing the referent and the number of
  1037. ...         times the reference has been called.
  1038. ...         '''
  1039. ...         ob = super(ExtendedRef, self).__call__()
  1040. ...         if ob is not None:
  1041. ...             self.__counter += 1
  1042. ...             ob = (ob, self.__counter)
  1043. ...         return ob
  1044. ...
  1045. >>> class A:   # not in docs from here, just testing the ExtendedRef
  1046. ...     pass
  1047. ...
  1048. >>> a = A()
  1049. >>> r = ExtendedRef(a, foo=1, bar="baz")
  1050. >>> r.foo
  1051. 1
  1052. >>> r.bar
  1053. 'baz'
  1054. >>> r()[1]
  1055. 1
  1056. >>> r()[1]
  1057. 2
  1058. >>> r()[0] is a
  1059. True
  1060.  
  1061.  
  1062. >>> import weakref
  1063. >>> _id2obj_dict = weakref.WeakValueDictionary()
  1064. >>> def remember(obj):
  1065. ...     oid = id(obj)
  1066. ...     _id2obj_dict[oid] = obj
  1067. ...     return oid
  1068. ...
  1069. >>> def id2obj(oid):
  1070. ...     return _id2obj_dict[oid]
  1071. ...
  1072. >>> a = A()             # from here, just testing
  1073. >>> a_id = remember(a)
  1074. >>> id2obj(a_id) is a
  1075. True
  1076. >>> del a
  1077. >>> try:
  1078. ...     id2obj(a_id)
  1079. ... except KeyError:
  1080. ...     print 'OK'
  1081. ... else:
  1082. ...     print 'WeakValueDictionary error'
  1083. OK
  1084.  
  1085. """
  1086.  
  1087. __test__ = {'libreftest' : libreftest}
  1088.  
  1089. def test_main():
  1090.     test_support.run_unittest(
  1091.         ReferencesTestCase,
  1092.         MappingTestCase,
  1093.         WeakValueDictionaryTestCase,
  1094.         WeakKeyDictionaryTestCase,
  1095.         )
  1096.     test_support.run_doctest(sys.modules[__name__])
  1097.  
  1098.  
  1099. if __name__ == "__main__":
  1100.     test_main()
  1101.